1 module bloomberg_dl.bloomberg_dl;
2 import libssh.session;
3 import libssh.sftp;
4 import libssh.errors;
5 import std.array;
6 import std.conv;
7 import std.datetime.systime : SysTime, Clock;
8 import std.datetime : UTC;
9 import std.zlib;
10 import std.algorithm;
11 import std.file;
12 import std.string : toStringz, fromStringz;
13 version(Windows)
14 {
15 	import core.stdc.stdio;
16 }
17 else
18 {
19 	import core.sys.posix.fcntl;
20 }
21 
22 
23 extern (C++)
24 void releaseTalbe(const (char)*** table);
25 
26 extern (C++)
27 const (char)*** soapGetData
28 (
29         const (char)* host,
30         const (char)* cert,
31         const (char)* pass,
32         const (char)** fields,
33         const (char)** ident,
34         bool withHeader,
35         long interval,
36         long retry
37 );
38 
39 extern (C++)
40 const (char)*** soapGetHistorical
41 (
42         const (char)* host,
43         const (char)* cert,
44         const (char)* pass,
45         const (char)** fields,
46         const (char)** ident,
47         const (char)* source,
48         long startDate,
49         long endDate,
50         bool withHeader,
51         long interval,
52         long retry
53 );
54 
55 
56 const (char)** toStrings(ref string[] strs)
57 {
58 	const (char)* [] lines;
59 	const (char)** result;
60 	lines.length = strs.length + 1;
61 	for (ulong i = 0; i < strs.length; ++i)
62 	{
63 		lines[i] = toStringz(strs[i]);
64 	}
65 	lines[strs.length] = null;
66 	result = lines.ptr;
67 	return result;
68 }
69 
70 string[][] converTalbe(const (char)*** table)
71 {
72 	import std.conv;
73 	string[][] result;
74 	for (ulong i = 0; table[i] != null; ++i)
75 	{
76 		string[] line;
77 		for (ulong j = 0; table[i][j] != null; ++j)
78 		{
79 			line ~= to!string(fromStringz(table[i][j]));
80 		}
81 		result ~= line;
82 	}
83 	releaseTalbe(table);
84 	return result;
85 }
86 
87 string[][] soapGetData(string host, string cert, string pass, string[] fields, string[] idents, bool withHeader,  long interval, long retry)
88 {
89 	return converTalbe(soapGetData(toStringz(host), toStringz(cert), toStringz(pass), toStrings(fields), toStrings(idents), withHeader, interval, retry));
90 }
91 
92 string[][] soapGetHistorical(string host, string cert, string pass, string[] fields, string[] idents, string source, long startDate, long endDate, bool withHeader, long interval, long retry)
93 {
94 	return converTalbe(soapGetHistorical(toStringz(host), toStringz(cert), toStringz(pass), toStrings(fields), toStrings(idents), toStringz(source), startDate, endDate, withHeader, interval, retry));
95 }
96 
97 class bloomberg_dl
98 {
99 private:
100 	static immutable string fileTemplate = 
101 "START-OF-FILE
102 FIRMNAME=USER_XXX
103 COMPRESS=yes
104 FILETYPE=pc
105 REPLYFILENAME=REPLY_XXX
106 RANG_XXX
107 PROGRAMNAME=PROGRAM_XXX
108 
109 START-OF-FIELDSFILED_XXX
110 END-OF-FIELDS
111 
112 START-OF-DATADATA_XXX
113 END-OF-DATA
114 END-OF-FILE";
115 	version(Windows)
116 	{
117 		static immutable string new_line = "\n";
118 		static immutable string start_split = "START-OF-DATA\n";
119 		static immutable string end_split = "\nEND-OF-DATA";
120 	}
121 	else
122 	{
123 		static immutable string new_line = "\r\n";
124 		static immutable string start_split = "START-OF-DATA\r\n";
125 		static immutable string end_split = "\r\nEND-OF-DATA";
126 	}
127 private:
128 	static string getFileName(string prefix)
129 	{
130 		string name = "api_";
131 		name ~= prefix;
132 		SysTime currentTime = Clock.currTime(UTC());
133 		long num = currentTime.fracSecs.total!"nsecs" / 1000000L;
134 		num += currentTime.second * 1000L;
135 		num += currentTime.minute * 100000L;
136 		num += currentTime.hour * 10000000L;
137 		num += currentTime.day * 1000000000L;
138 		num += to!long(currentTime.month) * 100000000000L;
139 		num += currentTime.year * 10000000000000L;
140 		name ~= to!string(num);
141 		return name;
142 	}
143 	static SSHSession getSession(string host, string user, string pass)
144 	{
145 		auto session = new SSHSession();
146 		session.host = host;
147 		session.user = user;
148 		session.logVerbosity = LogVerbosity.NoLog;
149 		session.connect();
150 		auto rc = session.userauthPassword(user, pass);
151 		if (AuthState.Success != rc)
152 			return null;
153 		return session; 
154 	}
155 	static string[] list(SFTPSession session)
156 	{
157 		SFTPAttributes attr;
158 		string[] result;
159 		auto dir = session.openDir("/");
160 		while (!dir.eof)
161 		{
162 			try
163 			{
164 				dir.readdir(attr);
165 				result ~= attr.name;
166 			}
167 			catch(SSHException)
168 			{
169 				break;
170 			}
171 		}
172 		dir.close();
173 		return result;
174 	}
175 	static bool wait_file(SFTPSession session, string file_name, long interval = 10, long retry = 100)
176 	{
177 		for (int i = 0; i < retry; ++i)
178 		{
179 			auto files = list(session);
180 			foreach (string file; files)
181 			{
182 				if (file == file_name)
183 				{
184 					return true;
185 				}
186 			}
187 			import core.thread;
188 			Thread.sleep( dur!("seconds")(interval));
189 		}
190 		return false;
191 	}
192 	static string getDataTemplateStringBase(string user, string[] fields, string[] idents)
193 	{
194 		string tmpStr = fileTemplate;
195 		string strFields = "";
196 		string strDatas = "";
197 		foreach (string field; fields)
198 		{
199 			strFields ~= "\n";
200 			strFields ~= field;
201 		}
202 		foreach (string ident; idents)
203 		{
204 			strDatas ~= "\n";
205 			strDatas ~= ident;
206 		}
207 		tmpStr = replace(tmpStr, "USER_XXX", user);
208 		tmpStr = replace(tmpStr, "FILED_XXX", strFields);
209 		tmpStr = replace(tmpStr, "DATA_XXX", strDatas);
210 		return tmpStr;
211 	}
212 	static string getDataTemplateString(string user, string[] fields, string[] idents)
213 	{
214 		auto result = getDataTemplateStringBase(user, fields, idents);
215 		auto range = "SECMASTER=yes";
216 		result  = replace(result, "RANG_XXX", range);
217 		result  = replace(result, "PROGRAM_XXX", "getdata");
218 		return result;
219 	}
220 	static string getHistoricalTemplateString(string user, string[] fields, string[] idents, string source, long startDate, long endDate)
221 	{
222 		auto result = getDataTemplateStringBase(user, fields, idents);
223 		string range = "PRICING_SOURCE=";
224 		range ~= source;
225 		range ~= "\nDATERANGE=";
226 		range ~= to!string(startDate);
227 		range ~= "|";
228 		range ~= to!string(endDate);
229 		range ~= "\nHIST_FORMAT=horizontal\n";
230 		result  = replace(result, "RANG_XXX", range);
231 		result  = replace(result, "PROGRAM_XXX", "gethistory");
232 		return result;
233 	}
234 	static void write_file(SFTPSession session, string path, string content)
235 	{
236 		auto tmp_path = path;
237 		tmp_path ~= ".tmp";
238 		int access_type = O_WRONLY | O_CREAT | O_TRUNC;
239 		int mode = S_IRWXU;
240 		auto file = session.open(tmp_path, access_type, mode);
241 		ubyte[] buffer = cast(ubyte[])content;
242 		file.write(buffer);
243 		file.close();
244 		session.rename(tmp_path, path);
245 	}
246 	static string read_file(SFTPSession session, string path)
247 	{
248 		int access_type = O_RDONLY;
249 		int mode = 0;
250 		SFTPFile file = null;
251 		try
252 		{
253 			file = session.open(path, access_type, mode);
254 		}
255 		catch(SFTPException)
256 		{
257 			return "";
258 		}
259 		ubyte[] all_buff;
260 		while (true)
261 		{
262 			ubyte[] rd_buff;
263 			rd_buff.length = 1024;
264 			try
265 			{
266 				auto cnt = file.read(rd_buff);
267 				if (cnt < 0)
268 				{
269 					return "";
270 				}
271 				else if (cnt == 0)
272 				{
273 					break;
274 				}
275 				rd_buff.length = cnt;
276 				all_buff ~= rd_buff;
277 			}
278 			catch(SFTPException)
279 			{
280 				return "";
281 			}
282 		}
283 		file.close();
284 		void[] result_buffer;
285 		auto uc = new UnCompress(HeaderFormat.gzip);
286 		result_buffer ~= uc.uncompress(all_buff);
287 		result_buffer ~= uc.flush();
288 		return cast(string)result_buffer;
289 	}
290 	static string[][] decode_file(SFTPSession session, string path)
291 	{
292 		string[][] result;
293 		auto str = read_file(session, path);
294 		if (str == "")
295 		{
296 			return result;
297 		}
298 		str = str.findSplit(start_split)[2];
299 		str = str.findSplit(end_split)[0];
300 		auto lines = str.split(new_line);
301 		foreach (string line;  lines)
302 		{
303 			auto columns = line.split("|");
304 			string[] line_array;
305 			for (long i = 0; i < columns.length; ++i)
306 			{
307 				if ((i < 1 || i > 2) && i != columns.length - 1)
308 				{
309 					line_array ~= columns[i];
310 				}
311 			}
312 			result ~= line_array;
313 		}
314 		return result;
315 
316 	}
317 	static string convertCsv(string[][] data)
318 	{
319 		string result;
320 		for (long i = 0; i < data.length; ++i)
321 		{
322 			string line = "";
323 			auto column = data[i];
324 			for (long j = 0; j < column.length; ++j)
325 			{
326 				line ~= column[j];
327 				if (j != column.length - 1)
328 				{
329 					line ~= ",";
330 				}
331 			}
332 			result ~= line;
333 			result ~= "\n";
334 		}
335 		return result;
336 	}
337 	static void saveData(string str, string path, int level)
338 	{
339 		if (level < 0 || level > 9)
340 			level = 0;
341 		if (0 == level)
342 		{
343 			std.file.write(path, str);
344 		}
345 		else
346 		{
347 			auto cmp = new Compress(level, HeaderFormat.gzip);
348 			auto result_buffer = cmp.compress(cast(void[])str);
349 			result_buffer ~= cmp.flush();
350 			path ~= ".gz";
351 			std.file.write(path, result_buffer);
352 		}
353 	}
354 public:
355 	static string[][] getData(string host, string user, string pass, string[] fields, string[] idents, bool withHeader = true,  long interval = 10, long retry = 100)
356 	{
357 		import std.string;
358 		if (host == "" || 0 == indexOf(host, "http"))
359 		{
360 			return soapGetData(host, user, pass, fields, idents, withHeader, interval, retry);
361 		}
362 		string[][] result;
363 		if (withHeader)
364 		{
365 			string[] header = ["BbgID"];
366 			header ~= fields;
367 			result ~= header;
368 		}
369 		auto file_name = getFileName("D");
370 		auto req_file_name = "/";
371 		auto res_file_name =  "/";
372 		auto wait_name = file_name;
373 		wait_name ~= ".gz";
374 		req_file_name ~= file_name;
375 		req_file_name ~= ".req";
376 		res_file_name ~= file_name;
377 		res_file_name ~= ".gz";
378 		auto sshSession = getSession(host, user, pass);
379 		if (sshSession is null)
380 			return result;
381 		auto session = sshSession.newSFTP();
382 		auto upload_str = getDataTemplateString(user, fields, idents);
383 		upload_str = replace(upload_str, "REPLY_XXX", file_name);
384 		write_file(session, req_file_name, upload_str);
385 		if (!wait_file(session, wait_name, interval, retry))
386 			return result;
387 		result ~= decode_file(session, res_file_name);
388 		session.dispose();
389 		sshSession.dispose();
390 		return result;
391 
392 	}
393 	static string[][] getHistorical(string host, string user, string pass, string[] fields, string[] idents, string source, long startDate, long endDate, bool withHeader = true,  long interval = 10, long retry = 100)
394 	{
395 		import std.string;
396 		if (host == "" || 0 == indexOf(host, "http"))
397 		{
398 			return soapGetHistorical(host, user, pass, fields, idents, source, startDate, endDate, withHeader, interval, retry);
399 		}
400 		string[][] result;
401 		if (withHeader)
402 		{
403 			string[] header = ["BbgID", "Date"];
404 			header ~= fields;
405 			result ~= header;
406 		}
407 		auto file_name = getFileName("H");
408 		auto req_file_name = "/";
409 		auto res_file_name =  "/";
410 		auto wait_name = file_name;
411 		wait_name ~= ".gz";
412 		req_file_name ~= file_name;
413 		req_file_name ~= ".req";
414 		res_file_name ~= file_name;
415 		res_file_name ~= ".gz";
416 		auto sshSession = getSession(host, user, pass);
417 		if (sshSession is null)
418 			return result;
419 		auto session = sshSession.newSFTP();
420 		auto upload_str = getHistoricalTemplateString(user, fields, idents, source, startDate, endDate);
421 		upload_str = replace(upload_str, "REPLY_XXX", file_name);
422 		write_file(session, req_file_name, upload_str);
423 		if (!wait_file(session, wait_name, interval, retry))
424 			return result;
425 		result ~= decode_file(session, res_file_name);
426 		session.dispose();
427 		sshSession.dispose();
428 		return result;
429 	}
430 	static string getDataCsv(string host, string user, string pass, string[] fields, string[] idents, bool withHeader = true,  long interval = 10, long retry = 100)
431 	{
432 		return convertCsv(getData(host, user, pass, fields, idents, withHeader, interval, retry));
433 	}
434 	static string getHistoricalCsv(string host, string user, string pass, string[] fields, string[] idents, string source, long startDate, long endDate, bool withHeader = true,  long interval = 10, long retry = 100)
435 	{
436 		return convertCsv(getHistorical(host, user, pass, fields, idents, source, startDate, endDate, withHeader, interval, retry));
437 	}
438 	static void downloadData(string host, string user, string pass, string[] fields, string[] idents, string path, int level = 0,  bool withHeader = true,  long interval = 10, long retry = 100)
439 	{
440 		saveData(convertCsv(getData(host, user, pass, fields, idents, withHeader, interval, retry)), path, level);
441 	}
442 	static void downloadHistorical(string host, string user, string pass, string[] fields, string[] idents, string source, long startDate, long endDate, string path, int level = 0, bool withHeader = true,  long interval = 10, long retry = 100)
443 	{
444 		saveData(convertCsv(getHistorical(host, user, pass, fields, idents, source, startDate, endDate, withHeader, interval, retry)), path, level);
445 	}
446 };